package ex.tableapi;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class UdfTest_TableAggregateFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String creatDDL = "CREATE TABLE clickTable (" +
                "user_name STRING," +
                "url STRING," +
                "ts BIGINT," +
                "et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000))," + //事件时间  FROM_UNIXTIME() 能转换为年月日时分秒这样的格式 转换秒
                " WATERMARK FOR et AS et - INTERVAL '1' SECOND " + //watermark 延迟一秒
                ")WITH(" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'data/input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";

        tableEnv.executeSql(creatDDL);

        //2.注册自定义的表聚合函数
        tableEnv.createTemporarySystemFunction("Top2", Top2.class);

        //3.调用UDF进行查询转换
        String windowAggQuery = "SELECT user_name,COUNT(url) AS cnt,window_start,window_end " +
                "FROM TABLE(" +
                " TUMBLE(TABLE clickTable,DESCRIPTOR(et),INTERVAL '10' SECOND)" +
                ")" +
                "GROUP BY user_name,window_start,window_end";

        Table aggTable = tableEnv.sqlQuery(windowAggQuery);
        tableEnv.toChangelogStream(aggTable).print("aggTable");
        //FlinkSQL 对表聚合函数支持并不是很好，这里使用TableAPI方式
        Table resultTable = aggTable.groupBy($("user_name"),$("window_end"))
                .flatAggregate(call("Top2", $("cnt")).as("value", "rank"))
                .select($("user_name"),$("window_end"), $("value"), $("rank"));

        //4.转换成流打印输出
        tableEnv.toChangelogStream(resultTable).print("resultTable");

        env.execute();
    }

    //单独定义一个累加器类型，包含了当前最大和第二大的数据
    public static class Top2Accumulator {
        public Long max;
        public Long secondMax;
    }


    //实现自定义的表聚合函数
    public static class Top2 extends TableAggregateFunction<Tuple2<Long, Integer>, Top2Accumulator> {

        @Override
        public Top2Accumulator createAccumulator() {
            //创建累加器(初始化累加器)
            Top2Accumulator top2Accumulator = new Top2Accumulator();
            top2Accumulator.max = Long.MIN_VALUE;
            top2Accumulator.secondMax = Long.MIN_VALUE;
            return top2Accumulator;
        }

        //定义一个更新累加器的方法
        public void accumulate(Top2Accumulator accumulate, Long value) {
            if (value > accumulate.max) {
                accumulate.secondMax = accumulate.max;
                accumulate.max = value;
            } else if (value > accumulate.secondMax) {
                accumulate.secondMax = value;
            }
        }

        //输出结果，获取当前的 Top2
        public void emitValue(Top2Accumulator accumulator, Collector<Tuple2<Long, Integer>> out) {
            if (accumulator.max != Long.MIN_VALUE) {
                out.collect(Tuple2.of(accumulator.max, 1));
            }
            if (accumulator.secondMax != Long.MIN_VALUE) {
                out.collect(Tuple2.of(accumulator.secondMax, 2));
            }
        }
    }
}
